Skip to content

Nexus: worker, workflow-backed operations, and workflow caller #813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 129 commits into
base: main
Choose a base branch
from

Conversation

dandavison
Copy link
Contributor

@dandavison dandavison commented Apr 2, 2025

Initial Python Temporal Nexus implementation.

Temporal SDK PR to accompany

Contains Nexus worker, components for users to define workflow-backed Nexus operations, and the ability to start and cancel a Nexus operation from a workflow.

Notes for reviewers

  • nexusrpc.handler.start_workflow is a top-level static function, but currently there's still a public contextvar object nexusrpc.handler.temporal_operation_context. Let's settle on approach there. If we're using module-level getters I think they need to be named something like get_client() etc, despite the fact that we have activity.metric_meter() ? Little bit more discussion needed there.
  • I'll do another cleanup pass on docstrings, API docs, etc.

@dandavison dandavison force-pushed the nexus branch 2 times, most recently from a845a6f to 7a121bb Compare April 5, 2025 15:08
@dandavison dandavison force-pushed the nexus branch 10 times, most recently from 862e4f9 to 8ac0192 Compare April 17, 2025 12:17
@dandavison dandavison force-pushed the nexus branch 4 times, most recently from 8940d51 to 520aecf Compare April 26, 2025 16:16
@dandavison dandavison force-pushed the nexus branch 2 times, most recently from 2fc971f to 8bd6011 Compare April 29, 2025 13:11
@dandavison dandavison changed the title Nexus prototype Nexus May 8, 2025
@dandavison dandavison force-pushed the nexus branch 2 times, most recently from 5d54f48 to 8ddfb94 Compare May 24, 2025 01:49
@dandavison dandavison force-pushed the nexus branch 7 times, most recently from fa3e8ec to f7bf47b Compare May 27, 2025 20:34
Copy link
Member

@cretz cretz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added another round of review. Note, there are some comments from previous rounds that are still valid/unaddressed.

Comment on lines 377 to 378
self._type = type
self._retryable = retryable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify what I'm saying here:

Suggested change
self._type = type
self._retryable = retryable
self.type = type
self.retryable = retryable

We should not hide the attributes from the user, same for operation error which it does appear we added property getters for (I'm fine with either property getters or visible/documented attributes)

Comment on lines +6 to +11
from ._operation_context import (
_TemporalCancelOperationContext as _TemporalCancelOperationContext,
)
from ._operation_context import (
_TemporalStartOperationContext as _TemporalStartOperationContext,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we intentionally exposing _ prefixed types?

):
ctx = _try_temporal_context()
if ctx is None:
raise RuntimeError("Not in Nexus operation context.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to expose a in_handler() or something similarly named akin to in_activity() and in_workflow() that exist today

"""The task queue of the worker handling this Nexus operation."""


def info() -> Info:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to open issue with other SDKs for this feature, they have no concept of accessible "info" inside handlers

return workflow_handle


@dataclass(frozen=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO not much value making this a dataclass, just a constructor that is marked "don't use" accepting the context and storing as an attr I think is good

@@ -99,6 +99,7 @@ def env_type(request: pytest.FixtureRequest) -> str:
@pytest_asyncio.fixture(scope="session")
async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
if env_type == "local":
http_port = 7243
Copy link
Member

@cretz cretz Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the official way of starting/cancelling Nexus operations via workflows instead of this less-official HTTP manner? Is there something that HTTP gives you that Nexus client in the workflow doesn't that you need to test? EDIT: I see we have another thread at #813 (comment) that overlaps this from before, can discuss there.

Comment on lines +22 to +23
# TODO(nexus-preview): How do we recommend that users create endpoints in their own tests?
# See https://github.com/temporalio/sdk-typescript/pull/1708/files?show-viewed-files=true&file-filters%5B%5D=&w=0#r2082549085
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is important. We need to improve the Nexus testing story. We should consider accepting Nexus endpoints on WorkflowEnvironment.start_local and WorkflowEnvironment.start_time_skipping and maybe even expose a helper creation method on the environment itself for use post-start. We can move some of that into Core or even into the CLI dev server and Java test server, that can remain to be seen.

Also, can you confirm that immediately upon create_nexus_endpoint return the endpoint can be used? In other cases (create namespace, create search attribute) there have been issues in the past with eventual consistency.

Comment on lines +66 to +88
async def fetch_operation_info(
self,
operation: str,
token: str,
) -> httpx.Response:
async with httpx.AsyncClient() as http_client:
return await http_client.get(
f"{self.server_address}/nexus/endpoints/{self.endpoint}/services/{self.service}/{operation}",
# Token can also be sent as "Nexus-Operation-Token" header
params={"token": token},
)

async def fetch_operation_result(
self,
operation: str,
token: str,
) -> httpx.Response:
async with httpx.AsyncClient() as http_client:
return await http_client.get(
f"{self.server_address}/nexus/endpoints/{self.endpoint}/services/{self.service}/{operation}/result",
# Token can also be sent as "Nexus-Operation-Token" header
params={"token": token},
)
Copy link
Member

@cretz cretz Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two are unused, correct? I don't think these are considered stable/supported at this time


namespace: str
workflow_id: str
_type: OperationTokenType = OPERATION_TOKEN_TYPE_WORKFLOW
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hidden/private fields are confusing on user-facing dataclasses IMO. Can this be removed?

await _test_start_operation(test_case, True, env)


async def _test_start_operation(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure it's too late to change. Ideally we start Nexus like a user does, we cancel Nexus like a user does. To use advanced/secret ways to do this makes these tests hard to give to users as examples of how they might test (though that's obviously not the goal of the tests, it's a nice benefit). If something is impossible via workflow caller, we should not be using it since a user can't either.

For the most part, everything we do in a test a user should be able to do in their tests too. It has the added benefit of confirming how a user might test these things and spotting any gaps. If you are unable to use existing pieces to test the user-facing components and have to resort to HTTP, what are we going to tell users when they encounter the same situation and want to test the same thing the same way?

Comment on lines +105 to +106
# TODO(nexus-prerelease): do we need to track cancel operation
# tasks as we do start operation tasks?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do IMO. Cancels can be canceled (i.e. timeout from Core POV)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants